"""
Case Type   : 发布订阅
Case Name   : 同步基础数据时,存在增量数据
Create At   : 2023/04/03
Owner       : cr13
Description :
    1.在两个集群创建表
    2.插入表数据
    3.创建发布订阅,创建发布订阅后立即修改表数据
    4.订阅端查询数据是否更新
    5.清理环境
Expect      :
    1.成功
    2.成功
    3.成功
    4.更新
    5.成功
History     : 
"""

import os
import unittest
from yat.test import macro
from yat.test import Node
from testcase.utils.Common import Common
from testcase.utils.Logger import Logger
from testcase.utils.CommonSH import CommonSH
from testcase.utils.Constant import Constant
from testcase.utils.ComThread import ComThread

Primary_SH = CommonSH('PrimaryDbUser')


@unittest.skipIf(3 != Primary_SH.get_node_num(), '非1+2环境不执行')
class Pubsubclass(unittest.TestCase):
    def setUp(self):
        self.log = Logger()
        self.log.info("-----------this is setup-----------")
        self.log.info(f"-----{os.path.basename(__file__)} start-----")
        self.pri_userdb_pub = Node(node='PrimaryDbUser')
        self.pri_rootdb_pub = Node(node='PrimaryRoot')
        self.pri_userdb_sub = Node(node='remote1_PrimaryDbUser')
        self.pri_rootdb_sub = Node(node='remote1_PrimaryRoot')
        self.constant = Constant()
        self.commsh_pub = CommonSH('PrimaryDbUser')
        self.commsh_sub = CommonSH('remote1_PrimaryDbUser')
        self.com_pub = Common()
        self.com_sub = Common('remote1_PrimaryDbUser')
        self.tbname1 = 't_pub_sub_0256_1'
        self.tbname2 = 't_pub_sub_0256_2'
        self.subname = "s_pub_sub_0256"
        self.pubname = "p_pub_sub_0256"
        self.parent_path_pub = os.path.dirname(macro.DB_INSTANCE_PATH)
        self.parent_path_sub = os.path.dirname(macro.DB_INSTANCE_PATH_REMOTE1)
        self.port = str(int(self.pri_userdb_pub.db_port) + 1)
        self.wal_level = self.com_pub.show_param("wal_level")
        self.user_param_pub = f'-U {self.pri_userdb_pub.db_user} ' \
            f'-W {self.pri_userdb_pub.db_password}'
        self.user_param_sub = f'-U {self.pri_userdb_sub.db_user} ' \
            f'-W {self.pri_userdb_sub.db_password}'
        cmd = f"cp " \
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(result)
        cmd = f"cp " \
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')}" \
            f" {os.path.join(self.parent_path_sub, 'pg_hba.conf')};"
        self.log.info(cmd)
        result = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(result)
        self.hostname = self.pri_userdb_sub.sh('hostname').result().strip()
        text = "step:预置条件,修改pg_hba expect:成功"
        self.log.info(text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    replication  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_pub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, '',
            f'host    all  {self.pri_userdb_sub.db_user} '
            f'{self.pri_userdb_sub.db_host}/32 sha256')
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        result = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG, 'wal_level=logical')
        self.assertTrue(result, '执行失败' + text)
        result = self.commsh_pub.restart_db_cluster(True)
        flg = self.constant.START_SUCCESS_MSG in result or 'Degrade' in result
        self.assertTrue(flg, '执行失败' + text)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host    replication  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)
        guc_res = self.commsh_sub.execute_gsguc(
            'reload', self.constant.GSGUC_SUCCESS_MSG, '',
            'all', False, False, macro.DB_INSTANCE_PATH_REMOTE1,
            f'host all  {self.pri_userdb_pub.db_user} '
            f'{self.pri_userdb_pub.db_host}/32 sha256',
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(guc_res)
        self.assertTrue(guc_res, '执行失败' + text)

    def test_pub_sub(self):
        text = "step1:在两个集群创建表 expect:成功"
        self.log.info(text)
        sql_cmd = f"drop table if exists {self.tbname1},{self.tbname2};" \
            f"create table {self.tbname1}(id int primary key,more text);" \
            f"create table {self.tbname2}(id int primary key,more text) with " \
            f"(storage_type=ustore);"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertIn(self.constant.DROP_TABLE_SUCCESS, pub_result,
                      '执行失败' + text)
        self.assertEqual(pub_result.count(self.constant.TABLE_CREATE_SUCCESS),
                         4, '执行失败' + text)
        sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertIn(self.constant.DROP_TABLE_SUCCESS, sub_result,
                      '执行失败' + text)
        self.assertEqual(sub_result.count(self.constant.TABLE_CREATE_SUCCESS),
                         4, '执行失败' + text)

        text = "step2:插入表数据 expect:成功"
        self.log.info(text)
        sql_cmd = f"insert into {self.tbname1} values" \
            f"(generate_series(1,100000),'test');insert into {self.tbname2} " \
            f"values(generate_series(1,100000),'test');"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertEqual(pub_result.count(self.constant.INSERT_SUCCESS_MSG),
                         2, '执行失败' + text)

        text = "step3:创建发布订阅,创建发布订阅后立即修改表数据 " \
               "expect:成功"
        self.log.info(text)
        sql_cmd = f"drop publication if exists {self.pubname};" \
            f"create publication {self.pubname} for all tables;"
        pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(pub_result)
        self.assertIn(self.constant.drop_pub_succ_msg, pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.create_pub_succ_msg, pub_result,
                      '执行失败' + text)
        result = self.commsh_sub.execute_generate(
            macro.COMMON_PASSWD, env_path=macro.DB_ENV_PATH_REMOTE1)
        self.assertIn(self.constant.create_keycipher_success, result,
                      '执行失败' + text)
        sql_cmd = f"drop subscription if exists {self.subname};" \
            f"create subscription {self.subname} connection " \
            f"'host={self.pri_userdb_pub.db_host} " \
            f"port={self.port} user={self.pri_userdb_pub.db_user} " \
            f"dbname={self.pri_userdb_pub.db_name} " \
            f"password={self.pri_userdb_pub.db_password}' publication " \
            f"{self.pubname} with (copy_data=true);"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertIn(self.constant.drop_sub_succ_msg, sub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.create_sub_succ_msg, sub_result, '执行失败'
                      + text)

        sql_cmd = f"insert into {self.tbname1} values(generate_series(100001," \
            f"109999), 'step4');update {self.tbname1} set more='update' " \
            f"where id<3000;"
        session1 = ComThread(self.commsh_pub.execut_db_sql, args=(
            sql_cmd, self.user_param_pub))
        session1.setDaemon(True)

        sql_cmd = f"delete from {self.tbname2} where id>3000 and id<10000;" \
            f"insert into {self.tbname2} values(generate_series(4000,6000), " \
            f"'insert');"
        session2 = ComThread(self.commsh_pub.execut_db_sql, args=(
            sql_cmd, self.user_param_pub))
        session2.setDaemon(True)
        session1.start()
        session2.start()
        session1.join(15)
        session2.join(15)
        result1 = session1.get_result()
        self.log.info(result1)
        result2 = session2.get_result()
        self.log.info(result2)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, result1, '执行失败' +
                      text)
        self.assertIn(self.constant.UPDATE_SUCCESS_MSG, result1, '执行失败' +
                      text)
        self.assertIn(self.constant.DELETE_SUCCESS_MSG, result2, '执行失败' +
                      text)
        self.assertIn(self.constant.INSERT_SUCCESS_MSG, result2, '执行失败' +
                      text)

        text = "step4:订阅端查询数据是否更新 expect:更新"
        self.log.info(text)
        sql_select = f"select pg_sleep(8);select count(*) from " \
            f"{self.tbname1};select count(*) from {self.tbname1} where more=" \
            f"'update';"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_select, self.user_param_sub, None,
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertEqual(str(sub_result.splitlines()[-7]).strip(), '109999',
                         '执行失败' + text)
        self.assertEqual(str(sub_result.splitlines()[-2]).strip(), '2999',
                         '执行失败' + text)
        sql_select = f"select pg_sleep(8);select count(*) from " \
            f"{self.tbname2};select count(*) from {self.tbname2} where more=" \
            f"'insert';"
        sub_result = self.commsh_sub.execut_db_sql(
            sql_select, self.user_param_sub, None,
            macro.DB_ENV_PATH_REMOTE1)
        self.log.info(sub_result)
        self.assertEqual(str(sub_result.splitlines()[-7]).strip(), '95002',
                         '执行失败' + text)
        self.assertEqual(str(sub_result.splitlines()[-2]).strip(), '2001',
                         '执行失败' + text)

    def tearDown(self):
        text = "step5:清理环境 expect:成功"
        self.log.info(text)
        sql_cmd = f"drop subscription if exists {self.subname};"
        drop_sub_result = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(drop_sub_result)
        sql_cmd = f"drop publication if exists {self.pubname};"
        drop_pub_result = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(drop_pub_result)
        sql_cmd = f"drop table if exists {self.tbname1},{self.tbname2};"
        result_sub = self.commsh_sub.execut_db_sql(
            sql_cmd, self.user_param_sub, None, macro.DB_ENV_PATH_REMOTE1)
        self.log.info(result_sub)
        result_pub = self.commsh_pub.execut_db_sql(
            sql_cmd, sql_type=self.user_param_pub)
        self.log.info(result_pub)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_pub, 'pg_hba.conf')} " \
            f"{os.path.join(macro.DB_INSTANCE_PATH, 'pg_hba.conf')} "
        self.log.info(cmd)
        sh_result1 = self.pri_userdb_pub.sh(cmd).result()
        self.log.info(sh_result1)
        cmd = f"mv " \
            f"{os.path.join(self.parent_path_sub, 'pg_hba.conf')} " \
            f"{os.path.join(macro.DB_INSTANCE_PATH_REMOTE1, 'pg_hba.conf')} "
        self.log.info(cmd)
        sh_result2 = self.pri_userdb_sub.sh(cmd).result()
        self.log.info(sh_result2)
        result1 = self.commsh_pub.execute_gsguc(
            'set', self.constant.GSGUC_SUCCESS_MSG,
            f'wal_level={self.wal_level}')
        self.log.info(result1)
        pub_result = self.commsh_pub.restart_db_cluster(True)
        sub_result = self.commsh_sub.restart_db_cluster(
            True, macro.DB_ENV_PATH_REMOTE1)
        self.assertTrue(pub_result, '执行失败' + text)
        self.assertTrue(sub_result, '执行失败' + text)
        self.assertTrue(result1, '执行失败' + text)
        self.assertEqual("", sh_result1, '执行失败' + text)
        self.assertEqual("", sh_result2, '执行失败' + text)
        self.assertIn(self.constant.drop_pub_succ_msg, drop_pub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.drop_sub_succ_msg, drop_sub_result,
                      '执行失败' + text)
        self.assertIn(self.constant.DROP_TABLE_SUCCESS, result_sub,
                      '执行失败' + text)
        self.assertIn(self.constant.DROP_TABLE_SUCCESS, result_pub,
                      '执行失败' + text)
        self.log.info(f"-----{os.path.basename(__file__)} end-----")

